/*
 * Copyright (C) 2022-2022. Huawei Technologies Co., Ltd. All rights reserved.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.openlookeng.service.impl;

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.EnumUtil;
import cn.hutool.http.HttpUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.jcraft.jsch.SftpException;
import com.openlookeng.core.boot.dto.LoginUserInfoDTO;
import com.openlookeng.core.boot.jwt.model.Audience;
import com.openlookeng.core.boot.utils.Constant;
import com.openlookeng.core.boot.utils.JwtTokenUtil;
import com.openlookeng.core.dto.SshUserDTO;
import com.openlookeng.core.tool.Md5Utils;
import com.openlookeng.dto.UpdateClusterDTO;
import com.openlookeng.entity.Ecs;
import com.openlookeng.entity.OpenlookengCluster;
import com.openlookeng.entity.OpenlookengClusterConfig;
import com.openlookeng.entity.SysFile;
import com.openlookeng.enums.ConfigSynStateEnum;
import com.openlookeng.enums.ConfigTypeEnum;
import com.openlookeng.enums.NodeTypeEnum;
import com.openlookeng.exception.BizException;
import com.openlookeng.mapper.EcsMapper;
import com.openlookeng.mapper.OpenlookengClusterConfigMapper;
import com.openlookeng.mapper.OpenlookengClusterMapper;
import com.openlookeng.service.IEcsService;
import com.openlookeng.service.IFileService;
import com.openlookeng.service.IOpenLookengService;
import com.openlookeng.service.IOpenlookengClusterConfigService;
import com.openlookeng.service.IOpenlookengClusterService;
import com.openlookeng.utils.LinuxShellUtils;
import com.openlookeng.utils.ShellConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

import java.io.File;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
@Service
public class OpenLookengServiceImpl
        implements IOpenLookengService
{
    @Autowired
    private IOpenlookengClusterService openlookengClusterService;
    @Autowired
    private IOpenlookengClusterConfigService openlookengClusterConfigService;
    @Resource
    private OpenlookengClusterConfigMapper openlookengClusterConfigMapper;
    @Autowired
    private IFileService fileService;
    @Autowired
    private IEcsService ecsService;
    @Value("${openlookeng.fileDir}")
    private String fileDir;
    @Autowired
    private Audience audience;
    @Resource
    private OpenlookengClusterMapper openlookengClusterMapper;
    @Resource
    private EcsMapper ecsMapper;

    /**
     * Get the path of the cluster deployment
     *
     * @param openlookengCluster
     */
    public String getDeployPath(OpenlookengCluster openlookengCluster)
    {
        String path = LinuxShellUtils.getDeployPath(openlookengCluster);
        String uploadPath = openlookengCluster.getInstallationPath() + "upload/";
        Integer ecsId = openlookengCluster.getEcsId();
        Ecs ecs = ecsService.getById(ecsId);
        SshUserDTO sshUserDTO = new SshUserDTO();
        sshUserDTO.setSshUser(ecs.getUserName());
        sshUserDTO.setSshPass(Md5Utils.decrypt(ecs.getPassWord()));
        sshUserDTO.setSshPort(ecs.getPort());
        sshUserDTO.setSshHost(ecs.getIp());
        LinuxShellUtils.createDirAndPermit(sshUserDTO, path);
        LinuxShellUtils.createDirAndPermit(sshUserDTO, uploadPath);
        return path;
    }

    public ConfigTypeEnum getConfigTypeEnumByType(String type)
    {
        Map<String, ConfigTypeEnum> enumMap = EnumUtil.getEnumMap(ConfigTypeEnum.class);
        ConfigTypeEnum configTypeEnum = enumMap.get(type.toUpperCase());
        if (configTypeEnum == null) {
            configTypeEnum = ConfigTypeEnum.UN_KNOWN;
        }
        return configTypeEnum;
    }

    /**
     * 生成配置文件
     *
     * @return
     */
    public void deployNodeConfigFile(OpenlookengCluster openlookengCluster, List<OpenlookengClusterConfig> clusterConfigList, SshUserDTO sshUserDTO)
    {
        Long start = System.currentTimeMillis();
        log.info("{}:节点开始同步配置文件,ip:{}", openlookengCluster.getId(), sshUserDTO.getSshHost());
        clusterConfigList = clusterConfigList.stream().filter(a -> {
            return openlookengCluster.getId().equals(a.getOpenlookengClusterId()) && openlookengCluster.getVersion().equalsIgnoreCase(a.getVersion());
        }).collect(Collectors.toList());
        //group by type
        Map<String, List<OpenlookengClusterConfig>> configMap = clusterConfigList.stream().collect(Collectors.groupingBy(OpenlookengClusterConfig::getType));
        String deployPath = getDeployPath(openlookengCluster);
        //Delete files in etc directory
        LinuxShellUtils.delConfigFile(sshUserDTO, deployPath);
        String fullPath = fileDir + deployPath + openlookengCluster.getId() + "/";
        boolean fileDirExist = FileUtil.exist(fullPath);
        if (fileDirExist) {
            File dir = new File(fullPath);
            List<String> fileNameList = Arrays.asList(dir.list());
            if (!fileNameList.isEmpty()) {
                fileNameList.forEach(a -> {
                    String filePath = fullPath + a;
                    try {
                        FileUtil.del(filePath);
                    }
                    catch (Exception e) {
                        log.error("del configFile:{}, fail:{}", filePath, e.getMessage());
                    }
                });
            }
        }
        Boolean isHa = false; // Whether to open Ha mode, the conditions need to be confirmed
        String clusterCode = openlookengCluster.getClusterCode();
        List<OpenlookengCluster> openlookengClusterList = openlookengClusterService.listClusterByCode(clusterCode);
        //number of coordinator nodes
        Long masterCount = openlookengClusterList.stream().filter(h -> {
            return NodeTypeEnum.COORDINATOR.listMasterType().contains(h.getNodeType());
        }).count();
        //Whether the file system is configured
        Boolean ifConfigFileSystem = clusterConfigList.stream().filter(u -> {
            return u.getTypeGroup().equalsIgnoreCase(ConfigTypeEnum.FILESYSTEM.getType());
        }).count() > 0;
        //Whether state storage is configured
        Boolean ifConfigStateStore = clusterConfigList.stream().filter(u -> {
            return u.getTypeGroup().equalsIgnoreCase(ConfigTypeEnum.STATESTORE.getType());
        }).count() > 0;
        //Configure the file system Coordinate nodes are greater than or equal to 2 and enable HA mode
        if (ifConfigFileSystem && ifConfigStateStore && masterCount >= 2) {
            isHa = true;
            log.info("Cluster opens Ha mode");
        }
        Boolean finalIsHa = isHa;
        configMap.forEach((k, v) -> {
            ConfigTypeEnum configTypeEnum = getConfigTypeEnumByType(k);
            //Group by configuration group
            Map<String, List<OpenlookengClusterConfig>> configGroupMap = v.stream().collect(Collectors.groupingBy(OpenlookengClusterConfig::getConfigGroup));
            Integer groupSize = configGroupMap.size();
            configGroupMap.forEach((kk, vv) -> {
                try {
                    String filePath = vv.stream().map(a -> a.getFilePath()).distinct().findFirst().orElse("");
                    String fullFilePath = fullPath;
                    //When multiple configuration files of the same type are generated in the form of xx1.properties xx2.properties
                    if (groupSize > 1) {
                        String[] filePathArry = filePath.split("\\.");
                        fullFilePath = fullFilePath + kk.toLowerCase() + "." + filePathArry[1];
                    }
                    else {
                        fullFilePath = fullFilePath + filePath;
                    }
                    Map<String, String> configFileMap = new HashMap();
                    vv.forEach(a -> {
                        String proType = a.getProType();
                        //If it is a file type, you need to transfer the file to the corresponding server
                        if ("file".equalsIgnoreCase(proType)) {
                            String proVal = a.getProValue();
                            if (StringUtils.isNotBlank(proVal)) {
                                List<String> fileList = Arrays.asList(proVal.split(","));
                                fileList.forEach(f -> {
                                    File configFile = new File(fileDir + f);
                                    if (configFile.exists()) {
                                        LinuxShellUtils.uploadFileToLinux(sshUserDTO, configFile, a.getProValue());
                                    }
                                });
                            }
                        }
                        configFileMap.put(a.getProKeyName(), a.getProValue());
                    });
                    List<String> newLines = null;
                    //jvm is not a key-value pair
                    if (configTypeEnum.getType().equalsIgnoreCase(ConfigTypeEnum.JVM.getType())) {
                        newLines = vv.stream().map(a -> a.getProValue()).collect(Collectors.toList());
                    }
                    else {
                        //If it is a general type, you need to increase or decrease the configuration according to the node role, simplify the operation
                        if (configTypeEnum.getType().equalsIgnoreCase(ConfigTypeEnum.CONFIG.getType())) {
                            //port
                            //configFileMap.put("http-server.http.port", openlookengCluster.getPort().toString());
                            String nodeType = openlookengCluster.getNodeType();
                            //Worker Node
                            if (NodeTypeEnum.WORKER_NODES.getType().equalsIgnoreCase(nodeType)) {
                                configFileMap.put("coordinator", "false");
                                configFileMap.remove("discovery-server.enabled"); //Remove this attribute or it won't work
                                if (finalIsHa) {
                                    configFileMap.put("hetu.multiple-coordinator.enabled", "true"); // Ha
                                }
                            }
                            //Coordinator Node
                            if (NodeTypeEnum.COORDINATOR.getType().equalsIgnoreCase(nodeType)) {
                                configFileMap.put("coordinator", "true");
                                configFileMap.put("node-scheduler.include-coordinator", "false");
                                configFileMap.put("discovery-server.enabled", "true");
                                if (finalIsHa) {
                                    configFileMap.put("hetu.multiple-coordinator.enabled", "true");
                                    configFileMap.put("hetu.embedded-state-store.enabled", "true");
                                }
                            }
                            //Worker Node + Coordinator Node
                            if (NodeTypeEnum.ALL.getType().equalsIgnoreCase(nodeType)) {
                                configFileMap.put("coordinator", "true");
                                configFileMap.put("node-scheduler.include-coordinator", "true");
                                configFileMap.put("discovery-server.enabled", "true");
                                if (finalIsHa) {
                                    configFileMap.put("hetu.multiple-coordinator.enabled", "true");
                                    configFileMap.put("hetu.embedded-state-store.enabled", "true");
                                }
                            }
                        }
                        //If it is a state storage configuration, the role is only the coordinator node and does not issue the configuration
                        if (configTypeEnum.getType().equalsIgnoreCase(ConfigTypeEnum.STATESTORE.getType())) {
                            String nodeType = openlookengCluster.getNodeType();
                            if (NodeTypeEnum.COORDINATOR.getType().equalsIgnoreCase(nodeType)) {
                                configFileMap.clear();
                            }
                        }
                        newLines = convertMapToLines(configFileMap);
                    }
                    //There is content written to the file sync server
                    if (!newLines.isEmpty() && StringUtils.isNotBlank(filePath)) {
                        java.io.File file = new java.io.File(fullFilePath);
                        FileUtil.appendLines(newLines, file, Charset.defaultCharset());
                        String remoteDir = filePath.substring(0, filePath.lastIndexOf("/") + 1);
                        remoteDir = remoteDir.substring(1, remoteDir.length());
                        LinuxShellUtils.uploadFileToLinux(sshUserDTO, file, deployPath + remoteDir);
                    }
                }
                catch (Exception e) {
                    log.info("{}:The node starts to synchronize the configuration file abnormally,ip:{},err:{}", openlookengCluster.getId(), sshUserDTO.getSshHost(), e.getMessage());
                }
            });
        });
        Long end = System.currentTimeMillis();
        log.info("{}:Node starts sync configuration file complete,ip:{},time consuming:{}", openlookengCluster.getId(), sshUserDTO.getSshHost(), (end - start));
    }

    /**
     * Convert property map to content
     *
     * @param map
     * @return
     */
    public List<String> convertMapToLines(Map<String, String> map)
    {
        List<String> lines = new ArrayList<>();
        map.forEach((k, v) -> {
            String res = k + "=" + v;
            lines.add(res);
        });
        return lines;
    }

    /**
     * deploy a node
     *
     * @param ecs
     * @param openlookengCluster
     */
    public void deployNode(Ecs ecs, OpenlookengCluster openlookengCluster, SysFile sysFile, List<OpenlookengClusterConfig> clusterConfigList, Boolean isStart)
    {
        Long start = System.currentTimeMillis();
        try {
            log.info("Start deploying nodes:{},ip:{}", openlookengCluster.getId(), ecs.getIp());
            String path = getDeployPath(openlookengCluster);
            String uploadPath = openlookengCluster.getInstallationPath() + "upload/";
            SshUserDTO sshUserDTO = LinuxShellUtils.getOpenLookengUser(ecs);
            String unpackFilePath = openlookengCluster.getInstallationPath();
            String fileName = sysFile.getFileName();
            List<String> fileNameList = LinuxShellUtils.listFile(sshUserDTO, uploadPath);
            if (!fileNameList.contains(fileName)) {
                //If there are no files in the directory
                Long uploadStart = System.currentTimeMillis();
                log.info("{}:Prepare to upload the compressed package>>>>>>>>>>>>>>>>>>>fileName:{}", ecs.getIp(), fileName);
                //log.info("fileName:{}", fileName);
                File openLookengFile = new File(sysFile.getFilePath());
                if (!openLookengFile.exists()) {
                    throw new BizException("安装包不存在!");
                }
                //Upload the installation package to the corresponding server
                Boolean res = LinuxShellUtils.uploadFileToLinux(sshUserDTO, openLookengFile, uploadPath);
                Long uploadEnd = System.currentTimeMillis();
                log.info("{}:The compressed package is uploaded,res:{}>>>>>>>>>>>>>>>>>>>time consuming:{}", ecs.getIp(), res, (uploadEnd - uploadStart));
            }
            //decompress
            LinuxShellUtils.unpackFile(sshUserDTO, path, sysFile, unpackFilePath);
            //Deploy the configuration file into the restart logic
            //deployNodeConfigFile(ecs, openlookengCluster, clusterConfigList);
            //start up
            if (isStart) {
                restartNode(openlookengCluster.getId());
            }
        }
        catch (Exception e) {
            log.info("Deployment node exception:{},ip:{},time consuming:{}", openlookengCluster.getId(), ecs.getIp(), e.getMessage());
        }
        Long end = System.currentTimeMillis();
        log.info("Node deployment is complete:{},ip:{},time consuming:{}", openlookengCluster.getId(), ecs.getIp(), (end - start));
    }

    @Override
    public void deployNodeConfigFile(Ecs ecs, OpenlookengCluster openlookengCluster, List<OpenlookengClusterConfig> clusterConfigList)
    {
        SshUserDTO sshUserDTO = LinuxShellUtils.getOpenLookengUser(ecs);
        //Generate deployment configuration file
        deployNodeConfigFile(openlookengCluster, clusterConfigList, sshUserDTO);
    }

    public void deployNodeConfigFile(Integer nodeId)
    {
        OpenlookengCluster openlookengCluster = openlookengClusterService.getById(nodeId);
        List<OpenlookengClusterConfig> clusterConfigList = openlookengClusterConfigService.listClusterConfig(openlookengCluster.getCreateUserId(), openlookengCluster.getId(), openlookengCluster.getVersion());
        Integer ecsId = openlookengCluster.getEcsId();
        Ecs ecs = ecsService.getById(ecsId);
        deployNodeConfigFile(ecs, openlookengCluster, clusterConfigList);
    }
    @Override
    public void deployClusterConfigFile(String clusterCode)
    {
        List<OpenlookengCluster> openlookengClusterList = openlookengClusterService.listClusterByCode(clusterCode);
        List<OpenlookengClusterConfig> clusterConfigList = openlookengClusterConfigService.listConfigByClusterCode(clusterCode);
        openlookengClusterList.forEach(a -> {
            Ecs ecs = ecsService.getById(a.getEcsId());
            this.deployNodeConfigFile(ecs, a, clusterConfigList);
        });
    }

    @Override
    public void deployCluster(Integer fileId, Boolean isStart)
    {
        LoginUserInfoDTO currentLoginUser = JwtTokenUtil.getCurrentLoginUser(audience);
        String clusterCode = currentLoginUser.getId();
        List<OpenlookengCluster> openlookengClusterList = openlookengClusterService.listClusterByCode(clusterCode);
        List<Integer> ecsIdList = openlookengClusterList.stream().map(a -> a.getEcsId()).collect(Collectors.toList());
        List<Ecs> ecsList = ecsService.listByIds(ecsIdList);
        //coordinator node
        List<OpenlookengCluster> coordinatorList = openlookengClusterList.stream().filter(t -> {
            List<String> masterList = NodeTypeEnum.COORDINATOR.listMasterType();
            Long c = masterList.stream().filter(h -> h.equalsIgnoreCase(t.getNodeType())).count();
            return c > 0;
        }).collect(Collectors.toList());
        //worker node
        List<OpenlookengCluster> workerList = openlookengClusterList.stream().filter(t -> {
            List<String> masterList = NodeTypeEnum.COORDINATOR.listMasterType();
            Long c = masterList.stream().filter(h -> h.equalsIgnoreCase(t.getNodeType())).count();
            return c <= 0;
        }).collect(Collectors.toList());
        List<String> ids = Stream.concat(coordinatorList.stream(), workerList.stream()).map(item -> item.getId().toString()).distinct().collect(Collectors.toList());
        log.info("user:{},deployed{}node", clusterCode, ids);
        //Deploy the coordinator node first and then deploy the worker node, otherwise, under Ha, the worker node will start up and an error will be reported because the coordinator node has not been started.
        deployNodeList(fileId, coordinatorList, ecsList, isStart);
        deployNodeList(fileId, workerList, ecsList, isStart);
    }

    @Override
    public void updateClusterVersion(UpdateClusterDTO updateClusterDTO)
    {
        LoginUserInfoDTO currentLoginUser = JwtTokenUtil.getCurrentLoginUser(audience);
        String clusterCode = currentLoginUser.getId();
        List<OpenlookengCluster> openlookengClusterList = null;
        List<Integer> nodeIdList = updateClusterDTO.getNodeIds();
        if (nodeIdList.isEmpty()) {
            openlookengClusterList = openlookengClusterService.listClusterByCode(clusterCode);
        }
        else {
            openlookengClusterList = openlookengClusterService.listByIds(nodeIdList);
        }
        List<String> ids = openlookengClusterList.stream().map(item -> item.getId().toString()).distinct().collect(Collectors.toList());
        log.info("用户:{},对节点{}执行了升级操作", clusterCode, ids);
        List<OpenlookengCluster> workNodes = openlookengClusterList.stream().filter(item -> Constant.NODE_TYPE_1.getCode().equalsIgnoreCase(item.getNodeType()) || Constant.NODE_TYPE_4.getCode().equalsIgnoreCase(item.getNodeType())).collect(Collectors.toList());
        List<OpenlookengCluster> coorNodes = openlookengClusterList.stream().filter(item -> !Constant.NODE_TYPE_1.getCode().equalsIgnoreCase(item.getNodeType()) && !Constant.NODE_TYPE_4.getCode().equalsIgnoreCase(item.getNodeType())).collect(Collectors.toList());
        this.updateClusterVersion(coorNodes, updateClusterDTO);
        this.updateClusterVersion(workNodes, updateClusterDTO);
    }

    private void updateClusterVersion(List<OpenlookengCluster> list, UpdateClusterDTO updateClusterDTO)
    {
        List<Integer> ecsIdList = list.stream().map(a -> a.getEcsId()).collect(Collectors.toList());
        List<Ecs> ecsList = ecsService.listByIds(ecsIdList);
        ecsList.forEach(a -> {
            Map<String, String> osInfo = ecsService.getOsInfoFromCache(a);
            if (osInfo.size() <= 1) {
                throw new BizException("主机:" + a.getIp() + "已关闭无法升级!");
            }
        });
        updateVersionNodeList(updateClusterDTO.getFileId(), list, ecsList, updateClusterDTO.getIsStart());
    }

    /**
     * Update node version
     *
     * @param sysFileId
     * @param openlookengClusterList
     * @param ecsList
     * @param isStart
     */
    public void updateVersionNodeList(Integer sysFileId, List<OpenlookengCluster> openlookengClusterList, List<Ecs> ecsList, Boolean isStart)
    {
        SysFile sysFile = fileService.getById(sysFileId);
        String newVersion = sysFile.getVersion();
        String clusterVersion = this.getVersion(-1).getOrDefault("version", "");
        List<OpenlookengClusterConfig> defaultClusterConfigList = openlookengClusterConfigService.listSysDefaultConfig(sysFile.getVersion());
        if (defaultClusterConfigList.isEmpty()) {
            throw new BizException(sysFile.getVersion() + "版本没有默认配置项,请更换版本!");
        }
        Long needUpdateCount = openlookengClusterList.stream().filter(a -> !a.getVersion().equalsIgnoreCase(newVersion)).count();
        if (needUpdateCount <= 0) {
            return;
        }
        LoginUserInfoDTO loginUserInfoDTO = JwtTokenUtil.getCurrentLoginUser(audience);
        Integer userId = Integer.valueOf(loginUserInfoDTO.getId());
        CompletionService completionService = ThreadUtil.newCompletionService();
        openlookengClusterList.forEach(t -> {
            String orgVersion = t.getVersion();
            //Upgrade if the version is different
            if (!orgVersion.equalsIgnoreCase(newVersion)) {
                Ecs ecs = ecsList.stream().filter(y -> y.getId().equals(t.getEcsId())).findFirst().orElse(null);
                //old version configuration
                List<OpenlookengClusterConfig> clusterConfigList = openlookengClusterConfigService.listClusterConfig(userId, t.getId(), t.getVersion());
                //Delete the new version configuration
                openlookengClusterConfigService.delClusterConfig(userId, t.getId(), newVersion);
                //Overwrite the configuration of the previous version to the old version
                List<OpenlookengClusterConfig> needToAddList = clusterConfigList.stream().filter(h -> {
                    Long c = defaultClusterConfigList.stream().filter(o -> o.getProKeyName().equalsIgnoreCase(h.getProKeyName())).count();
                    return c > 0;
                }).collect(Collectors.toList());
                needToAddList.forEach(m -> {
                    m.setVersion(newVersion);
                    m.setId(null);
                });
                openlookengClusterConfigService.saveBatch(needToAddList);
                if (ecs != null) {
                    //new configuration list
                    List<OpenlookengClusterConfig> newConfigList = openlookengClusterConfigService.listClusterConfig(Integer.valueOf(loginUserInfoDTO.getId()), t.getId(), sysFile.getVersion());
                    Callable<OpenlookengCluster> callableTask = () -> {
                        if (ecs != null) {
                            changeNodeState(t.getId(), "stop");
                            SshUserDTO sshUserDTO = new SshUserDTO();
                            sshUserDTO.setSshUser(ecs.getUserName());
                            sshUserDTO.setSshPass(Md5Utils.decrypt(ecs.getPassWord()));
                            sshUserDTO.setSshHost(ecs.getIp());
                            sshUserDTO.setSshPort(ecs.getPort());
                            String deployPath = this.getDeployPath(t);
                            LinuxShellUtils.removeDirectory(sshUserDTO, deployPath);
                            t.setVersion(newVersion);
                            openlookengClusterService.updateById(t);
                            deployNode(ecs, t, sysFile, newConfigList, isStart);
                        }
                        if (isStart) {
                            t.setConfigSynState(ConfigSynStateEnum.SYN.getType());
                        }
                        t.setVersion(newVersion);
                        openlookengClusterService.updateById(t);
                        return t;
                    };
                    completionService.submit(callableTask);
                }
            }
        });
        openlookengClusterList.forEach(t -> {
            Future<OpenlookengCluster> result = null;
            try {
                result = completionService.take();
                OpenlookengCluster openlookengCluster = result.get();
                log.info("node:{},is update", openlookengCluster.getId());
            }
            catch (Exception e) {
                log.error("node:{},update err:{}", t.getId(), e.getMessage());
            }
        });

        //Version for the entire cluster
        //old version configuration
        List<OpenlookengClusterConfig> clusterConfigList = openlookengClusterConfigService.listClusterConfig(userId, -1, clusterVersion);
        openlookengClusterConfigService.delClusterConfig(userId, -1, newVersion);
        //Overwrite the configuration of the previous version to the old version
        List<OpenlookengClusterConfig> needToAddList = clusterConfigList.stream().filter(h -> {
            Long c = defaultClusterConfigList.stream().filter(o -> o.getProKeyName().equalsIgnoreCase(h.getProKeyName())).count();
            return c > 0;
        }).collect(Collectors.toList());
        needToAddList.forEach(m -> {
            m.setVersion(newVersion);
            m.setId(null);
        });
        openlookengClusterConfigService.saveBatch(needToAddList);
    }

    @Override
    public void deployNodeList(Integer sysFileId, List<OpenlookengCluster> openlookengClusterList, List<Ecs> ecsList, Boolean isStart)
    {
        if (openlookengClusterList.isEmpty() || ecsList.isEmpty()) {
            return;
        }
        SysFile sysFile = fileService.getById(sysFileId);
        LoginUserInfoDTO loginUserInfoDTO = JwtTokenUtil.getCurrentLoginUser(audience);
        CompletionService completionService = ThreadUtil.newCompletionService();
        openlookengClusterList.forEach(t -> {
            Ecs ecs = ecsList.stream().filter(y -> y.getId().equals(t.getEcsId())).findFirst().get();
            List<OpenlookengClusterConfig> clusterConfigList = openlookengClusterConfigService.listClusterConfig(Integer.valueOf(loginUserInfoDTO.getId()), t.getId(), t.getVersion());
            Callable<OpenlookengCluster> callableTask = () -> {
                deployNode(ecs, t, sysFile, clusterConfigList, isStart);
                return t;
            };
            completionService.submit(callableTask);
        });
        openlookengClusterList.forEach(t -> {
            Future<OpenlookengCluster> result = null;
            try {
                result = completionService.take();
                OpenlookengCluster openlookengCluster = result.get();
                log.info("node:{},is deployed", openlookengCluster.getId());
            }
            catch (Exception e) {
                log.info("node:{},deployed err:{}", t.getId());
            }
        });
    }

    @Override
    public void downloadDefaultVersion()
    {
        try {
            String res = HttpUtil.get("https://download.openlookeng.io/", 1000);
        }
        catch (Exception e) {
            log.info("No access to the Internet, no download！");
            return;
        }
        List<SysFile> sysFileList = fileService.listFileBySystemUpLoad();
        sysFileList.forEach(a -> {
            String downLoadUrl = a.getDownloadUrl();
            String fileName = downLoadUrl.substring(downLoadUrl.lastIndexOf("/") + 1);
            String filePath = fileDir + "/download/" + fileName;
            java.io.File file = new java.io.File(filePath);
            if (!file.exists()) {
                log.info("Start downloading the latest openlookeng installation package,version:{}!", a.getVersion());
                HttpUtil.downloadFile(downLoadUrl, filePath);
                log.info("Start to download the latest openlookeng installation package download complete,version:{}!", a.getVersion());
            }
            a.setFilePath(filePath);
            fileService.updateById(a);
        });
    }

    @Override
    public void restartNode(Integer nodeId)
    {
        changeNodeState(nodeId, "restart");
    }

    @Override
    public Map<String, String> getVersion(Integer nodeId)
    {
        Map<String, String> res = new HashMap<>();
        if (nodeId == -1) {
            List<OpenlookengCluster> clusterList = openlookengClusterService.listCurrentUserCluster();
            List<String> vers = clusterList.stream().map(item -> item.getVersion()).distinct().collect(Collectors.toList());
            Long aLong = vers.stream().map(item -> Long.parseLong(String.join("", item.split("\\.")))).sorted().findFirst().orElse(null);
            for (OpenlookengCluster cluster : clusterList) {
                long l = Long.parseLong(String.join("", cluster.getVersion().split("\\.")));
                if (aLong.equals(l)) {
                    res.put("version", cluster.getVersion());
                    break;
                }
            }
        }
        else {
            OpenlookengCluster cluster = openlookengClusterService.getById(nodeId);
            res.put("version", cluster.getVersion());
        }
        return res;
    }

    @Override
    public void changeNodeState(Integer nodeId, String state)
    {
        List<Integer> nodeIdList = new ArrayList<>();
        if (-1 == nodeId) {
            List<OpenlookengCluster> openlookengClusterList = openlookengClusterService.listCurrentUserCluster();
            nodeIdList = openlookengClusterList.stream().map(a -> a.getId()).distinct().collect(Collectors.toList());
        }
        else {
            nodeIdList.add(nodeId);
        }
        //Query by node id
        LambdaQueryWrapper<OpenlookengCluster> wrapper = new LambdaQueryWrapper<>();
        wrapper.in(OpenlookengCluster::getId, nodeIdList);
        List<OpenlookengCluster> list = openlookengClusterService.list(wrapper);

        List<OpenlookengCluster> workList = list.stream().filter(item -> Constant.NODE_TYPE_1.getCode().equalsIgnoreCase(item.getNodeType()) || Constant.NODE_TYPE_4.getCode().equalsIgnoreCase(item.getNodeType())).collect(Collectors.toList());
        List<OpenlookengCluster> coorList = list.stream().filter(item -> !Constant.NODE_TYPE_1.getCode().equalsIgnoreCase(item.getNodeType()) && !Constant.NODE_TYPE_4.getCode().equalsIgnoreCase(item.getNodeType())).collect(Collectors.toList());

        //The coordinator node is executed first, then the worker nodes are executed
        changeNodeState(coorList, state);
        changeNodeState(workList, state);
    }

    private void changeNodeState(List<OpenlookengCluster> list, String state)
    {
        if ("start".equalsIgnoreCase(state)) {
            state = "restart";
        }
        String finalState = state;
        list.stream().map(item -> item.getId()).distinct().forEach(a -> {
            OpenlookengCluster openlookengCluster = openlookengClusterService.getById(a);
            Integer ecsId = openlookengCluster.getEcsId();
            Ecs ecs = ecsService.getById(ecsId);
            Map<String, String> ecsInfo = ecsService.getOsInfoFromCache(ecs);
            if (ecsInfo.size() <= 1) {
                throw new BizException("主机:" + ecs.getIp() + "连接异常!");
            }
            //Restart sync configuration
            if ("restart".equalsIgnoreCase(finalState) || "start".equalsIgnoreCase(finalState)) {
                deployNodeConfigFile(a);
                openlookengCluster.setConfigSynState(ConfigSynStateEnum.SYN.getType());
                try {
                    openlookengClusterService.updateById(openlookengCluster);
                }
                catch (Exception e) {
                    log.error("{},Node database update status is abnormal:{}", a, e.getMessage());
                }
            }
            Long start = System.currentTimeMillis();
            try {
                String path = getDeployPath(openlookengCluster);
                String runCmd = path + ShellConstants.launcherCommand + finalState;
                SshUserDTO sshUserDTO = LinuxShellUtils.getOpenLookengUser(ecs);
                String res = LinuxShellUtils.execCmd(sshUserDTO, runCmd);
                Long end = System.currentTimeMillis();
                log.info("Change the node state successfully:{},cmd:{},time consuming:{},res:{}", a, runCmd, (end - start), res);
            }
            catch (Exception e) {
                log.info("Change node state exception:{},state:{},err:{}", a, finalState, e.getMessage());
            }
        });
    }

    @Override
    public void changeClassState(String state, List<Integer> nodeIds)
    {
        if (nodeIds.isEmpty()) {
            changeNodeState(-1, state);
        }
        else {
            nodeIds.stream().forEach(id -> {
                changeNodeState(id, state);
            });
        }
    }

    @Override
    public void removeColonyOrNode(String state, List<Integer> nodeIds, Boolean removeDirectory)
    {
        //Stop the instance first, then delete the directory, then delete the database Delete the configuration table
        try {
            this.changeClassState(state, nodeIds);
        }
        catch (Exception e) {
            log.info("removeColonyOrNode,Failed to change host state:{}", e.getMessage());
        }
        List<OpenlookengCluster> openlookengClusters = new ArrayList<>();
        if (nodeIds.isEmpty()) {
            LoginUserInfoDTO currentLoginUser = JwtTokenUtil.getCurrentLoginUser(audience);
            QueryWrapper<OpenlookengCluster> wrapper = new QueryWrapper<>();
            wrapper.eq("create_user_id", currentLoginUser.getId());
            openlookengClusters = openlookengClusterMapper.selectList(wrapper);
            nodeIds = openlookengClusters.stream().map(i -> i.getId()).collect(Collectors.toList());
        }
        else {
            openlookengClusters = openlookengClusterMapper.selectBatchIds(nodeIds);
        }
        if (removeDirectory == true) {
            if (!openlookengClusters.isEmpty()) {
                List<Integer> ecsIds = openlookengClusters.stream().map(i -> i.getEcsId()).collect(Collectors.toList());
                List<Ecs> ecss = ecsMapper.selectBatchIds(ecsIds);
                List<OpenlookengCluster> finalOpenlookengClusters = openlookengClusters;
                ecss.stream().forEach(ecs -> {
                    SshUserDTO sshUserDTO = new SshUserDTO();
                    sshUserDTO.setSshUser(ecs.getUserName());
                    sshUserDTO.setSshPass(Md5Utils.decrypt(ecs.getPassWord()));
                    sshUserDTO.setSshHost(ecs.getIp());
                    sshUserDTO.setSshPort(ecs.getPort());
                    try {
                        OpenlookengCluster openlookengCluster = finalOpenlookengClusters.stream().filter(h -> h.getEcsId().equals(ecs.getId())).findFirst().get();
                        String deployPath = openlookengCluster.getInstallationPath();
                        LinuxShellUtils.removeDirectory(sshUserDTO, deployPath);
                    }
                    catch (SftpException e) {
                        throw new BizException("访问被拒绝，删除目录失败");
                    }
                });
            }
        }
        openlookengClusterMapper.deleteBatchIds(nodeIds);
        QueryWrapper<OpenlookengClusterConfig> wrapper = new QueryWrapper<>();
        wrapper.in("openlookeng_cluster_id", nodeIds);
        openlookengClusterConfigMapper.delete(wrapper);
    }
}
